//import org.apache.flink.api.common.io.FilePathFilter;
//import org.apache.flink.api.common.time.Time;
//import org.apache.flink.streaming.api.TimeCharacteristic;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
//import static java.util.logging.Level.WARNING;
//
//public class UsingWatermarkStrategies
//{
//
//
//    public static void main(String[] args) throws Exception
//    {
//
//
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//
//
//
//        class MyEvent{}
//
//        MyEvent> stream = env.readFile(
//                myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
//                FilePathFilter.createDefaultFilter(), typeInfo);
//
//        DataStream<MyEvent> withTimestampsAndWatermarks = stream
//                .filter( event -> event.severity() == WARNING )
//                .assignTimestampsAndWatermarks(<watermark strategy>);
//
//        withTimestampsAndWatermarks
//                .keyBy( (event) -> event.getGroup() )
//                .timeWindow(Time.seconds(10))
//                .reduce( (a, b) -> a.add(b) )
//                .addSink(...);
//
//
//    }
//}
